001 /* 002 * Copyright 2005 Stephen J. McConnell. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 013 * implied. 014 * 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019 package net.dpml.station.server; 020 021 import java.rmi.AlreadyBoundException; 022 import java.rmi.RemoteException; 023 import java.rmi.server.UnicastRemoteObject; 024 import java.rmi.registry.LocateRegistry; 025 import java.rmi.registry.Registry; 026 import java.net.URL; 027 import java.util.Map; 028 import java.util.Hashtable; 029 import java.util.LinkedList; 030 import java.util.List; 031 import java.util.EventObject; 032 033 import net.dpml.station.Application; 034 import net.dpml.station.Callback; 035 import net.dpml.station.Manager; 036 import net.dpml.station.Station; 037 import net.dpml.station.StationException; 038 039 import net.dpml.station.info.ApplicationDescriptor; 040 import net.dpml.station.info.StartupPolicy; 041 import net.dpml.station.ApplicationRegistry; 042 043 import net.dpml.util.Logger; 044 import net.dpml.transit.model.TransitModel; 045 import net.dpml.transit.Disposable; 046 047 import net.dpml.lang.UnknownKeyException; 048 049 /** 050 * The RemoteStation is responsible for the establishment of 051 * callback monitors to external processes established by the 052 * station manager. 053 * @author <a href="http://www.dpml.net">Digital Product Meta Library</a> 054 * @version 1.0.1 055 */ 056 public class RemoteStation extends UnicastRemoteObject implements Station, Manager 057 { 058 private final RemoteApplicationRegistry m_registry; 059 private final Map m_applications = new Hashtable(); 060 private final Logger m_logger; 061 private final int m_port; 062 private final Registry m_rmiRegistry; 063 private final URL m_store; 064 private final TransitModel m_model; 065 private final LoggingServer m_server; 066 private final Thread m_thread; 067 068 private boolean m_terminated = false; 069 070 /** 071 * Creation of a station instance. 072 * 073 * @param logger the assigned logging channel 074 * @param model the transit model 075 * @param port the station port 076 * @param registryStorageUrl uri defining the registry backing store 077 * @exception Exception if a exception occurs during establishment 078 */ 079 public RemoteStation( 080 Logger logger, TransitModel model, int port, URL registryStorageUrl ) 081 throws Exception 082 { 083 super(); 084 085 m_logger = logger; 086 m_port = port; 087 m_store = registryStorageUrl; 088 m_model = model; 089 090 m_rmiRegistry = getLocalRegistry( port ); 091 092 try 093 { 094 m_rmiRegistry.bind( STATION_KEY, this ); 095 } 096 catch( AlreadyBoundException e ) 097 { 098 final String error = 099 "An instance of the Station is already bound to port " + port; 100 throw new StationException( error, e ); 101 } 102 103 setShutdownHook( this ); 104 startEventDispatchThread(); 105 106 try 107 { 108 m_server = new LoggingServer( 2020 ); 109 m_thread = new Thread( m_server, "DPML Station Logging Server" ); 110 m_thread.start(); 111 } 112 catch( Exception e ) 113 { 114 final String error = 115 "Unexpected error while attempting to start the logging server on port " + 2020; 116 throw new StationException( error, e ); 117 } 118 119 if( getLogger().isDebugEnabled() ) 120 { 121 if( null == registryStorageUrl ) 122 { 123 getLogger().debug( "loading registry from default storage" ); 124 } 125 else 126 { 127 getLogger().debug( "loading registry from [" + registryStorageUrl + "]" ); 128 } 129 } 130 131 m_registry = new RemoteApplicationRegistry( logger, registryStorageUrl ); 132 String[] keys = m_registry.getKeys(); 133 134 if( getLogger().isDebugEnabled() ) 135 { 136 getLogger().debug( "registry established (" + keys.length + ")" ); 137 } 138 139 for( int i=0; i<keys.length; i++ ) 140 { 141 String key = keys[i]; 142 try 143 { 144 ApplicationDescriptor descriptor = 145 m_registry.getApplicationDescriptor( key ); 146 if( StartupPolicy.AUTOMATIC.equals( descriptor.getStartupPolicy() ) ) 147 { 148 RemoteApplication application = getRemoteApplication( key ); 149 application.start(); 150 } 151 } 152 catch( UnknownKeyException e ) 153 { 154 throw new RuntimeException( e ); // will not happen 155 } 156 } 157 } 158 159 /** 160 * Return a string containing info about the general setup of the station. 161 * @return station configuration info 162 */ 163 public String[] getInfo() 164 { 165 String[] values = new String[4]; 166 values[0] = "Port: " + m_port; 167 values[1] = "Store: " + m_store; 168 values[2] = "Basedir: " + System.getProperty( "user.dir" ); 169 values[3] = "Codebase: " 170 + getClass().getProtectionDomain().getCodeSource().getLocation(); 171 return values; 172 } 173 174 /** 175 * Return an callback handler for the supplied id. 176 * @param id the callback id 177 * @return the callback handler 178 * @exception UnknownKeyException if the id is unknown 179 * @exception RemoteException if a remote error occurs 180 */ 181 public Callback getCallback( String id ) throws UnknownKeyException, RemoteException 182 { 183 // TODO: improve this so that this is only called once per appliation 184 return getRemoteApplication( id ); 185 } 186 187 /** 188 * Shutdown the station. 189 */ 190 public void shutdown() 191 { 192 shutdown( true ); 193 } 194 195 /** 196 * Shutdown the station. 197 * @param exit if true launch a process termination 198 */ 199 private void shutdown( boolean exit ) 200 { 201 synchronized( m_applications ) 202 { 203 if( m_terminated ) 204 { 205 return; 206 } 207 else 208 { 209 m_terminated = true; 210 } 211 212 if( getLogger().isInfoEnabled() ) 213 { 214 getLogger().info( "initiating station shutdown" ); 215 } 216 217 try 218 { 219 m_rmiRegistry.unbind( STATION_KEY ); 220 } 221 catch( Exception e ) 222 { 223 // ignore 224 } 225 try 226 { 227 RemoteApplication[] applications = getRemoteApplications(); 228 for( int i=0; i<applications.length; i++ ) 229 { 230 RemoteApplication application = applications[i]; 231 application.shutdown(); 232 UnicastRemoteObject.unexportObject( application, true ); 233 } 234 UnicastRemoteObject.unexportObject( m_registry, true ); 235 } 236 catch( Exception e ) 237 { 238 // ignore 239 } 240 try 241 { 242 int n = m_server.getErrorCount(); 243 if( n > 0 ) 244 { 245 getLogger().warn( "logging issues: " + n ); 246 } 247 m_thread.interrupt(); 248 } 249 catch( Exception e ) 250 { 251 // ignore 252 } 253 254 finally 255 { 256 if( getLogger().isInfoEnabled() ) 257 { 258 getLogger().info( "station shutdown complete" ); 259 } 260 261 if( exit ) 262 { 263 if( m_model instanceof Disposable ) 264 { 265 try 266 { 267 Disposable disposable = (Disposable) m_model; 268 disposable.dispose(); 269 } 270 catch( Exception e ) 271 { 272 // ignore 273 } 274 } 275 276 if( getLogger().isDebugEnabled() ) 277 { 278 getLogger().debug( "terminating process" ); 279 } 280 281 Thread thread = new Thread( 282 new Runnable() 283 { 284 public void run() 285 { 286 RemoteStation.m_DISPATCH.dispose(); 287 System.exit( 0 ); 288 } 289 } 290 ); 291 thread.start(); 292 } 293 } 294 } 295 } 296 297 /** 298 * Return the application registry. 299 * @return the registry 300 */ 301 public ApplicationRegistry getApplicationRegistry() 302 { 303 return m_registry; 304 } 305 306 /** 307 * Return an application reference for the supplied key. 308 * @param key the application key 309 * @return the application 310 * @exception UnknownKeyException if the key is unknown 311 * @exception RemoteException if a remote error occurs 312 */ 313 public Application getApplication( String key ) throws UnknownKeyException, RemoteException 314 { 315 return getRemoteApplication( key ); 316 } 317 318 /** 319 * Return an application reference for the supplied key. 320 * @param key the application key 321 * @return the application 322 * @exception UnknownKeyException if the key is unknown 323 * @exception RemoteException if a remote error occurs 324 */ 325 RemoteApplication getRemoteApplication( String key ) throws UnknownKeyException, RemoteException 326 { 327 synchronized( m_applications ) 328 { 329 if( m_applications.containsKey( key ) ) 330 { 331 return (RemoteApplication) m_applications.get( key ); 332 } 333 else 334 { 335 Logger logger = getLogger().getChildLogger( key ); 336 ApplicationDescriptor descriptor = m_registry.getApplicationDescriptor( key ); 337 RemoteApplication application = 338 new RemoteApplication( logger, descriptor, key, m_port ); 339 m_applications.put( key, application ); 340 return application; 341 } 342 } 343 } 344 345 /** 346 * Return an array of all remote applications. 347 * @return the applications array 348 */ 349 RemoteApplication[] getRemoteApplications() 350 { 351 synchronized( m_applications ) 352 { 353 return (RemoteApplication[]) m_applications.values().toArray( new RemoteApplication[0] ); 354 } 355 } 356 357 private Logger getLogger() 358 { 359 return m_logger; 360 } 361 362 private Registry getLocalRegistry( int port ) throws RemoteException 363 { 364 try 365 { 366 Registry registry = LocateRegistry.createRegistry( port ); 367 getLogger().debug( "created local registry on port " + port ); 368 return registry; 369 } 370 catch( RemoteException e ) 371 { 372 Registry registry = LocateRegistry.getRegistry( port ); 373 getLogger().debug( "using local registry on port " + port ); 374 return registry; 375 } 376 } 377 378 /** 379 * Queue of pending notification events. When an event for which 380 * there are one or more listeners occurs, it is placed on this queue 381 * and the queue is notified. A background thread waits on this queue 382 * and delivers the events. This decouples event delivery from 383 * the application concern, greatly simplifying locking and reducing 384 * opportunity for deadlock. 385 */ 386 private static final List EVENT_QUEUE = new LinkedList(); 387 388 /** 389 * Enqueue an event for delivery to registered 390 * listeners unless there are no registered 391 * listeners. 392 * @param event the event to enqueue 393 */ 394 static void enqueueEvent( EventObject event ) 395 { 396 synchronized( EVENT_QUEUE ) 397 { 398 EVENT_QUEUE.add( event ); 399 EVENT_QUEUE.notify(); 400 } 401 } 402 403 /** 404 * A single background thread ("the event notification thread") monitors 405 * the event queue and delivers events that are placed on the queue. 406 */ 407 private static class EventDispatchThread extends Thread 408 { 409 private final Logger m_logger; 410 411 private boolean m_continue = true; 412 413 EventDispatchThread( Logger logger ) 414 { 415 super( "DPML Station Event Dispatch" ); 416 m_logger = logger; 417 m_logger.debug( "starting event dispatch thread" ); 418 } 419 420 void dispose() 421 { 422 synchronized( EVENT_QUEUE ) 423 { 424 m_logger.debug( "stopping event dispatch thread" ); 425 m_continue = false; 426 EVENT_QUEUE.notify(); 427 } 428 } 429 430 public void run() 431 { 432 while( m_continue ) 433 { 434 // Wait on EVENT_QUEUE till an event is present 435 EventObject event = null; 436 synchronized( EVENT_QUEUE ) 437 { 438 try 439 { 440 while( EVENT_QUEUE.isEmpty() ) 441 { 442 EVENT_QUEUE.wait(); 443 } 444 Object object = EVENT_QUEUE.remove( 0 ); 445 try 446 { 447 event = (EventObject) object; 448 } 449 catch( ClassCastException cce ) 450 { 451 final String error = 452 "Unexpected class cast exception while processing an event." 453 + "\nEvent: " + object; 454 throw new IllegalStateException( error ); 455 } 456 } 457 catch( InterruptedException e ) 458 { 459 return; 460 } 461 } 462 463 Object source = event.getSource(); 464 if( source instanceof UnicastEventSource ) 465 { 466 UnicastEventSource producer = (UnicastEventSource) source; 467 try 468 { 469 producer.processEvent( event ); 470 } 471 catch( Throwable e ) 472 { 473 final String error = 474 "Unexpected error while processing event." 475 + "\nEvent: " + event 476 + "\nSource: " + source; 477 m_logger.warn( error, e ); 478 } 479 } 480 else 481 { 482 final String error = 483 "Event source [" 484 + source.getClass().getName() 485 + "] is not an instance of " + UnicastEventSource.class.getName(); 486 throw new IllegalStateException( error ); 487 } 488 } 489 490 m_logger.info( "Controller event queue terminating." ); 491 } 492 } 493 494 private static EventDispatchThread m_DISPATCH = null; 495 496 /** 497 * This method starts the event dispatch thread the first time it 498 * is called. The event dispatch thread will be started only 499 * if someone registers a listener. 500 */ 501 private synchronized void startEventDispatchThread() 502 { 503 if( m_DISPATCH == null ) 504 { 505 Logger logger = getLogger(); 506 m_DISPATCH = new EventDispatchThread( logger ); 507 m_DISPATCH.setDaemon( true ); 508 m_DISPATCH.start(); 509 } 510 } 511 512 /** 513 * Create a shutdown hook that will trigger shutdown of the supplied plugin. 514 * @param station the station 515 */ 516 public static void setShutdownHook( final RemoteStation station ) 517 { 518 // 519 // Create a shutdown hook to trigger clean disposal of the 520 // controller 521 // 522 523 Runtime.getRuntime().addShutdownHook( 524 new Thread() 525 { 526 public void run() 527 { 528 try 529 { 530 station.shutdown(); 531 } 532 catch( Throwable e ) 533 { 534 System.err.println( e.toString() ); 535 } 536 System.runFinalization(); 537 } 538 } 539 ); 540 } 541 542 }